Spark Catalyst Optimizer

1 定义

使用高级语言特性(如Scala的模式匹配quasi quote)构建的可扩展的查询优化器

Quasi quote可用于在运行时更新组合表达式的代码

2 目的

  • 易于添加新的优化措施,尤其在大数据场景
  • 易于扩展,如增加新的数据源规则、整合外部数据源、添加新的数据类型等

3 术语

树:节点对象的组合。其中,节点由0或多个子节点组成,不可变,且可以应用函数转换

如下:

1
Add(Attribute(x), Add(Literal(1), Literal(2)))

Catalyst blog figure 1

规则:应用规则将树转换为另一棵树,通常使用模式匹配替换子树的形式实现

Catalyst中使用transform对树的节点递归处理,并且可以一次性匹配多条规则

1
2
3
4
5
tree.transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
case Add(left, Literal(0)) => left
case Add(Literal(0), right) => right
}

实际上,为了充分转换,规则分批迭代应用,直到树不再变化。

规则中可以包含任意的Scala代码,因此可以实现基于语言的优化和调试。

4 在Spark SQL中使用Catalyst

Catalyst blog figure 2

在四个阶段中使用Catalyst:

  • 处理引用,在分析逻辑计划时
  • 逻辑计划优化
  • 生成物理计划
  • 转换部分查询为Java字节码,在代码生成时

其中,生成物理计划时,Catalyst会生成多个物理计划,然后根据成本选择。而其他阶段纯粹根据规则。

Catalyst包含了节点表达式、数据类型、逻辑和物理操作符等。

(1) 分析

无论是SQL Parser生成的抽象语法树AST,还是使用API构建的DataFrame对象,Spark SQL从一个包含没有解析属性引用的关系开始计算。如SELECT col FROM sales,检索表时并不知道其中的字段是否存在。

Spark SQL使用规则和Catalog对象跟踪所有数据源中的所有表,以便解析属性。从一个没有限定属性和数据类型的未解析逻辑计划开始,应用以下规则:(代码行数>1000行)

  • 从Catalog中按照名称查找关系
  • 映射属性到操作符的子节点
  • 判断相同值的属性,并分配唯一ID。后续用户优化表达式,如col=col
  • 类型传播和转换。转换表达式返回值为期望的类型

(2) 逻辑优化

应用标准规则到逻辑计划,包括常量折叠(constant folding)、谓词下推(predicate pushdown)、投影裁剪(projection pruning)、空值传播(null propagation)、布尔表达式简化(Boolean expression simplification)等。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 示例:扩展SUM表达式的小数类型,以支持64位浮点数
*/
object DecimalAggregates extends Rule[LogicalPlan] {
/** Maximum number of decimal digits in a Long */
val MAX_LONG_DIGITS = 18
def apply(plan: LogicalPlan): LogicalPlan = {
plan transformAllExpressions {
case Sum(e @ DecimalType.Expression(prec, scale))
if prec + 10 <= MAX_LONG_DIGITS =>
MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) }
}

(3) 生成物理计划

生成物理计划阶段,Spark SQL根据一个逻辑计划,生成多个物理计划,然后依据成本模型筛选。此时,成本优化仅用于关联算法选择。当然基于成本的优化有更广的用途。物理计划生成同时也支持基于规则的优化,如管道投影(pipling projection)等。此外,可以将物理计划下推到数据源,以支持预测和投影下推。

(4) 代码更新

因为需要基于CPU处理内存数据,因此希望能够支持代码更新,以优化效率。

通常代码更新对于编译器是一件复杂的事情。但是Catalyst使用Scala的quasiquote,使代码更新更加简单。quasiquote支持代码构建抽象语法树,并且可以使Scala编译器在运行时更新字节码。

1
2
3
4
5
6
7
8
9
10
/**
* 没有使用quasiquote时,需要便利AST解析每行数据
* 使用后,可以减少分支和虚函数调用,提升代码执行效率
* 模式匹配?
*/
def compile(node: Node): AST = node match {
case Literal(value) => q"$value"
case Attribute(name) => q"row.get($name)"
case Add(left, right) => q"${compile(left)} + ${compile(right)}"
}

quasiquote会检查类型,以确保准确匹配。是可组合的,不关心子节点返回。生成结果会再基于编译器优化,性能接近人工调试。

参考资料